// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//


#pragma once

#include <pollux/exec/container_row_serde.h>
#include <pollux/exec/operator.h>
#include <pollux/exec/operator_utils.h>
#include <pollux/exec/prefix_sort.h>
#include <pollux/exec/row_container.h>
#include <pollux/vector/base_vector.h>

namespace kumo::pollux::exec {
    class SortInputSpiller;
    class SortOutputSpiller;

    /// A utility class to accumulate data inside and output the sorted result.
    /// Spilling would be triggered if spilling is enabled and memory usage exceeds
    /// limit.
    class SortBuffer {
    public:
        SortBuffer(
            const RowTypePtr &input,
            const std::vector<column_index_t> &sortColumnIndices,
            const std::vector<CompareFlags> &sortCompareFlags,
            pollux::memory::MemoryPool *pool,
            tsan_atomic<bool> *nonReclaimableSection,
            common::PrefixSortConfig prefixSortConfig,
            const common::SpillConfig *spillConfig = nullptr,
            melon::Synchronized<pollux::common::SpillStats> *spillStats = nullptr);

        ~SortBuffer();

        void addInput(const VectorPtr &input);

        /// Indicates no more input and triggers either of:
        ///  - In-memory sorting on rows stored in 'data_' if spilling is not enabled.
        ///  - Finish spilling and setup the sort merge reader for the un-spilling
        ///  processing for the output.
        void noMoreInput();

        /// Returns the sorted output rows in batch.
        RowVectorPtr getOutput(vector_size_t maxOutputRows);

        /// Indicates if this sort buffer can spill or not.
        bool canSpill() const {
            return spillConfig_ != nullptr;
        }

        /// Invoked to spill all the rows from 'data_'.
        void spill();

        memory::MemoryPool *pool() const {
            return pool_;
        }

        std::optional<uint64_t> estimateOutputRowSize() const;

    private:
        // Ensures there is sufficient memory reserved to process 'input'.
        void ensureInputFits(const VectorPtr &input);

        // Reserves memory for output processing. If reservation cannot be increased,
        // spills enough to make output fit.
        void ensureOutputFits(vector_size_t outputBatchSize);

        // Reserves memory for sort. If reservation cannot be increased, spills enough
        // to make output fit.
        void ensureSortFits();

        void updateEstimatedOutputRowSize();

        // Invoked to initialize or reset the reusable output buffer to get output.
        void prepareOutput(vector_size_t outputBatchSize);

        // Invoked to initialize reader to read the spilled data from storage for
        // output processing.
        void prepareOutputWithSpill();

        void getOutputWithoutSpill();

        void getOutputWithSpill();

        // Spill during input stage.
        void spillInput();

        // Spill during output stage.
        void spillOutput();

        // Finish spill, and we shouldn't get any rows from non-spilled partition as
        // there is only one hash partition for SortBuffer.
        void finishSpill();

        // Returns true if the sort buffer has spilled, regardless of during input or
        // output processing. If spilled() is true, it means the sort buffer is in
        // minimal memory mode and could not be spilled further.
        bool hasSpilled() const;

        const RowTypePtr input_;

        const std::vector<CompareFlags> sortCompareFlags_;

        pollux::memory::MemoryPool *const pool_;

        // The flag is passed from the associated operator such as OrderBy or
        // TableWriter to indicate if this sort buffer object is under non-reclaimable
        // execution section or not.
        tsan_atomic<bool> *const nonReclaimableSection_;

        // Configuration settings for prefix-sort.
        const common::PrefixSortConfig prefixSortConfig_;

        const common::SpillConfig *const spillConfig_;

        melon::Synchronized<common::SpillStats> *const spillStats_;

        // The column projection map between 'input_' and 'spillerStoreType_' as sort
        // buffer stores the sort columns first in 'data_'.
        std::vector<IdentityProjection> columnMap_;

        // Indicates no more input. Once it is set, addInput() can't be called on this
        // sort buffer object.
        bool noMoreInput_ = false;

        // The number of received input rows.
        uint64_t numInputRows_ = 0;

        // Used to store the input data in row format.
        std::unique_ptr<RowContainer> data_;

        std::vector<char *, memory::StlAllocator<char *> > sortedRows_;

        // The data type of the rows stored in 'data_' and spilled on disk. The
        // sort key columns are stored first then the non-sorted data columns.
        RowTypePtr spillerStoreType_;

        std::unique_ptr<SortInputSpiller> inputSpiller_;

        std::unique_ptr<SortOutputSpiller> outputSpiller_;

        SpillPartitionSet spillPartitionSet_;

        // Used to merge the sorted runs from in-memory rows and spilled rows on disk.
        std::unique_ptr<TreeOfLosers<SpillMergeStream> > spillMerger_;

        // Records the source rows to copy to 'output_' in order.
        std::vector<const RowVector *> spillSources_;

        std::vector<vector_size_t> spillSourceRows_;

        // Reusable output vector.
        RowVectorPtr output_;

        // Estimated size of a single output row by using the max
        // 'data_->estimateRowSize()' across all accumulated data set.
        std::optional<uint64_t> estimatedOutputRowSize_{};

        // The number of rows that has been returned.
        uint64_t numOutputRows_{0};
    };
} // namespace kumo::pollux::exec
